# coding=utf8
import json
import selectors
import socket
from logging import getLogger
from queue import Queue

import settings
from libs.executor import Executor

SERVER = settings.BGTASK_SERVER
WORK_THREADS = settings.BGTASK_WORK_THREADS
_queue_size = 100

log = getLogger("bgtask")
sel = selectors.DefaultSelector()
executor = Executor(Queue(_queue_size), workers=WORK_THREADS)


def read_all(conn) -> bytes:
    data = b""
    while True:
        buf = conn.recv(2048)
        data += buf
        if len(buf) < 2048:
            break

    return data.rstrip(b"\n\n")


def accept(sock, mask):
    conn, addr = sock.accept()
    conn.setblocking(False)

    sel.register(conn, selectors.EVENT_READ, read)


def read(conn, mask):
    data = read_all(conn)
    d = json.loads(data.decode("utf8"))
    try:
        task_id = executor.submit(d["fn"], *d["args"], **d["kwargs"])
        conn.send(b"ok" + task_id.encode("utf8"))
    except Exception as e:
        log.error(f"server error: {e}", exc_info=True)
        conn.send(b"error")

    sel.unregister(conn)
    conn.close()


def run_server():
    sock = socket.socket()
    sock.bind(SERVER)
    sock.listen(100)
    sock.setblocking(False)
    sel.register(sock, selectors.EVENT_READ, accept)
    log.info("""\n
    .########...######...########....###.....######..##....##
    .##.....##.##....##.....##......##.##...##....##.##...##.
    .##.....##.##...........##.....##...##..##.......##..##..
    .########..##...####....##....##.....##..######..#####...
    .##.....##.##....##.....##....#########.......##.##..##..
    .##.....##.##....##.....##....##.....##.##....##.##...##.
    .########...######......##....##.....##..######..##....##
    """)

    while True:
        events = sel.select()
        for key, mask in events:
            callback = key.data
            callback(key.fileobj, mask)


def submit(fn, *args, **kwargs):
    """
    :param fn: It must be a callable object, or a string in the format package.module:name.
    :param args: callable object's args
    :param kwargs: callable object's kwargs
    :return: str or None. task id if str, submitting fail if None,
    """
    if callable(fn):
        fn = "%s:%s" % (fn.__module__, fn.__name__)

    data = {
        "fn": fn,
        "args": args,
        "kwargs": kwargs
    }

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect(SERVER)
        s.sendall(json.dumps(data).encode("utf8") + b"\n\n")
        data = s.recv(512)
        s.close()

    if data.startswith(b"ok"):
        return data.lstrip(b"ok").decode("utf8")


if __name__ == '__main__':
    run_server()
